Skip to content

Conversation

@vaibhavtiwari33
Copy link
Contributor

@vaibhavtiwari33 vaibhavtiwari33 commented Jan 26, 2026

Fixes: #205

Notes

Change in OnSuccess message metadata

With the introduction of a shared UserMetadata class, the userMetadata member in Message class for OnSuccess message has been changed from:

   private final HashMap<String, KeyValueGroup> userMetadata;

to

    private final UserMetadata userMetadata;

This is done so that the users can utilize the same UserMetadata for onSuccess message without conversion.

Furthermore, the new UserMetadata has been implemented in its current form since it allows parity with other SDKs (inner data holds Map of Map, instead of Map of KeyValueGroup) and also prevents the extra step of converting proto KeyValueGroup object to local KeyValueGroup class object and back.

Format of userMetadata and systemMetadata

Both UserMetadata and SystemMetadata classes have inner data in the format:

Map<String, Map<String, byte[]>> data;

@codecov
Copy link

codecov bot commented Jan 26, 2026

Codecov Report

❌ Patch coverage is 72.45509% with 46 lines in your changes missing coverage. Please review.
⚠️ Please upload report for BASE (main@3180f88). Learn more about missing BASE report.

Files with missing lines Patch % Lines
...main/java/io/numaproj/numaflow/sinker/Message.java 5.55% 16 Missing and 1 partial ⚠️
...ain/java/io/numaproj/numaflow/sourcer/Message.java 23.07% 9 Missing and 1 partial ⚠️
...o/numaproj/numaflow/sourcetransformer/Message.java 28.57% 4 Missing and 1 partial ⚠️
...java/io/numaproj/numaflow/shared/UserMetadata.java 94.20% 0 Missing and 4 partials ⚠️
...ain/java/io/numaproj/numaflow/sinker/Response.java 25.00% 3 Missing ⚠️
...va/io/numaproj/numaflow/shared/SystemMetadata.java 92.30% 1 Missing and 1 partial ⚠️
...java/io/numaproj/numaflow/mapper/HandlerDatum.java 50.00% 1 Missing ⚠️
.../java/io/numaproj/numaflow/mapper/MapperActor.java 80.00% 0 Missing and 1 partial ⚠️
.../numaproj/numaflow/sourcer/OutputObserverImpl.java 0.00% 0 Missing and 1 partial ⚠️
...aproj/numaflow/sourcetransformer/HandlerDatum.java 50.00% 1 Missing ⚠️
... and 1 more
Additional details and impacted files
@@           Coverage Diff           @@
##             main     #210   +/-   ##
=======================================
  Coverage        ?   60.95%           
  Complexity      ?      559           
=======================================
  Files           ?      155           
  Lines           ?     3563           
  Branches        ?      252           
=======================================
  Hits            ?     2172           
  Misses          ?     1212           
  Partials        ?      179           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@vaibhavtiwari33 vaibhavtiwari33 changed the title chore: Metadata propagation in source, source transformer, mapper, sink feat: Metadata propagation in source, source transformer, mapper, sink Jan 26, 2026
@vaibhavtiwari33 vaibhavtiwari33 self-assigned this Jan 26, 2026
@vaibhavtiwari33 vaibhavtiwari33 added the enhancement New feature or request label Jan 26, 2026
vtiwari5 and others added 3 commits January 26, 2026 11:00
@vaibhavtiwari33 vaibhavtiwari33 marked this pull request as ready for review January 26, 2026 23:20
Copy link
Contributor

@yhl25 yhl25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Use function overloading and introduce a method for constructing on success response using the Datum.
  • Remove onSuccess function for proto and use a private method.
  • Add an example(could be a follow-up PR).

@vaibhavtiwari33 vaibhavtiwari33 requested a review from yhl25 January 28, 2026 19:07
log.info("Writing to onSuccess sink: {}", datum.getId());
// Build the onSuccess message using builder for changing values, keys or userMetadata
responseListBuilder.addResponse(Response.responseOnSuccess(datum.getId(),
Message.builder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Value is a mandatory field, lets stick to constructor now. We can add support for builders eventually.

* @param message The message object to convert into the relevant proto object
* @return The converted proto object
*/
public static SinkOuterClass.SinkResponse.Result.Message toProto(Message message) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need expose this to users, Can we make it package private or move to a dedicate private helper method?

*/
public static Response responseOnSuccess(String id, SinkResponse.Result.Message onSuccessMessage) {
return new Response(id, false, null, false, false, null, true, onSuccessMessage);
public static Response responseOnSuccess(Datum datum) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we already have Message.fromDatum. I think this can be avoided.

Comment on lines +103 to +115
@Test
public void testAddKV_ignoresNulls() {
UserMetadata metadata = new UserMetadata();

metadata.addKV(null, "k", "v".getBytes());
metadata.addKV("g", null, "v".getBytes());
metadata.addKV("g", "k", null);

assertTrue(metadata.getGroups().isEmpty());
}

@Test
public void testAddKV_defensiveCopy() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge tests to cover different cases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Java SDK Changes to expose Metadata

3 participants